fix(stream): allow DeferredReadableStream to be reused after detachSource#1018
fix(stream): allow DeferredReadableStream to be reused after detachSource#1018mshivam019 wants to merge 1 commit intolivekit:mainfrom
Conversation
…urce When a user disconnects and reconnects (e.g., cancels browser 'Leave site?' dialog), the agent's audio input stream would throw 'Stream source already set' error because: 1. detachSource() releases the reader lock but didn't reset sourceReader 2. isSourceSet still returns true, so setSource() throws 3. The writer may be closed from the previous session This fix: - Resets sourceReader = undefined in detachSource() so isSourceSet returns false - Tracks writerClosed state and recreates transform in setSource() if needed - Handles race condition where pump() may still run when detachSource() is called
|
📝 WalkthroughWalkthroughThe deferred stream implementation now properly handles reattachment of sources by introducing a Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hey @mshivam019, the Consider an async task reading from this stream: const deferred = new DeferredReadableStream<string>();
const reader = deferred.stream.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
// process value...
}Once Regarding your scenarios:
Could you clarify: have you observed this deferred readable stream blocking the agent's progress during reconnection? If so, could you share more details about the specific scenario? That would help us better understand if there's an edge case we need to address. |
|
@toubatbrian Thanks for the detailed explanation of the design rationale! Let me clarify my specific use case: Scenario: Interview Session with Reconnection Grace PeriodI'm building a voice AI interview application where users may accidentally:
For this, I want a 3-minute grace window where the user can rejoin the same room and continue the conversation with full context preserved. How I've configured this:Room creation (backend): await roomService.createRoom({
name: roomName,
metadata: roomMetadata,
emptyTimeout: 300, // 5 minutes before any participant joins
departureTimeout: 180, // 3 minutes grace period after last participant leaves
maxParticipants: 10,
});Agent session (keeps session alive when user disconnects): await session.start({
agent: assistant,
room: ctx.room,
inputOptions: {
noiseCancellation: BackgroundVoiceCancellation(),
closeOnDisconnect: false, // Keep agent session alive for reconnection
},
});The ProblemWhen the user reconnects (same room, same identity):
QuestionIf making Should Happy to adjust the implementation based on your guidance! |
|
Hey @mshivam019, I made a PR to support a new stream primitive, which could hopefully support the usecase you want: #1036. Feel free to test on that branch and let me know if you encounter any issues! |
|
Hey @toubatbrian, thanks for the pointer! In the meantime, I migrated my architecture to create a fresh room on reconnection instead — the backend preloads the conversation context into the new agent session, so the user picks up right where they left off without needing stream reuse. That eliminated the need for this PR on my end. That said, keeping the original room open would still be the better approach — with the new room architecture, there's a window where recent conversation data that's still in the agent's context but hasn't been persisted by the backend yet can get lost during the handoff. The stream reuse approach avoids that entirely since the session stays alive. I checked out #1036 and the deferred stream approach looks solid. I'll give it a try soon and report back if I run into anything. |
|
I'm closing this PR since we've merged #1036 |
Description
Fix
DeferredReadableStreamto allowsetSource()to be called again afterdetachSource(), enabling stream reuse during participant reconnection scenarios.When a participant's connection is interrupted and restored (e.g., browser reload, network interruption, or user canceling the "Leave site?" confirmation dialog), the SDK's
ParticipantAudioInputStreamattempts to reattach the audio track viaonTrackSubscribed. This triggers:closeStream()→ callsdeferredStream.detachSource()deferredStream.setSource(newAudioStream)to attach the new trackHowever,
setSource()throws "Stream source already set" because:detachSource()releases the reader lock but doesn't resetsourceReadertoundefinedisSourceSet(which checks!!this.sourceReader) still returnstruewritermay already be closed from the previous sessionThis makes it impossible to reuse the agent session after any participant reconnection event.
Changes Made
sourceReader = undefinedindetachSource()soisSourceSetreturnsfalsewriterClosedstate tracking to detect when transform needs recreationsetSource()if writer was previously closed!this.sourceReaderbefore read inpump()to handle race condition wheredetachSource()is called while pump is runningPre-Review Checklist
Testing
deferred_stream.test.tspasspnpm build:agentsandpnpm testpassrestaurant_agent.tsandrealtime_agent.tswork properly - N/A for this internal stream fixManual Testing Performed:
Additional Notes
This bug affects any scenario where a participant disconnects and reconnects to the same room session. Without this fix, the agent becomes unresponsive after reconnection because audio input cannot be reattached.
Note to reviewers: Please ensure the pre-review checklist is completed before starting your review.